package main

import (
	"context"
	"fmt"
	segmentioKafka "github.com/segmentio/kafka-go"
	"hash/fnv"
)

// Notice 使用 segmentio/kafka-go 包根据用户id往kafka中的指定分区发送消息

const (
	brokerAddress = "localhost:9092"
	topic         = "my-topic"
	numPartitions = 6
)

func main() {
	// 创建 Kafka 生产者
	// Notice NewWriter 方法会在1.0版本后弃用
	//writer := segmentioKafka.NewWriter(segmentioKafka.WriterConfig{
	//	Brokers:  []string{brokerAddress},
	//	Topic:    topic,
	//	Balancer: &segmentioKafka.Hash{},
	//})

	ctx := context.Background()

	// Notice
	writer := &segmentioKafka.Writer{
		Topic:    topic,
		Addr:     segmentioKafka.TCP([]string{brokerAddress}...),
		Balancer: &segmentioKafka.Hash{},
		// Notice 异步发送消息，加上这个参数后 即使下面用for循环不用并发，发消息也是异步的
		Async: true,
		//Transport: sharedTransport, // Notice TLS连接的时候需要
	}

	// 模拟多个用户发送消息
	userIDs := []string{"user1", "user1", "user1", "user1", "user2", "user3", "user4"}

	for idx, userId := range userIDs {
		fmt.Println("userId: ", userId)

		// 发送消息
		err := writer.WriteMessages(
			ctx,
			segmentioKafka.Message{
				// Notice 上面在初始化 writer 的时候指定了 Balancer, 是用msg的Key做判断的～
				Key: []byte(userId),
				// Notice 从value的值来分析(有idx)，同一个用户id发送到同一个分区，是"顺序的"
				Value: []byte(fmt.Sprintf("Hello-%v-%v", idx, userId)),
			})
		if err != nil {
			fmt.Println("Failed to send message:", err)
		} else {
			fmt.Println("Message sent successfully")
		}
	}

	// 关闭 Kafka 生产者
	if err := writer.Close(); err != nil {
		fmt.Println("Failed to close Kafka writer:", err)
	}
}

func Hash(s string) uint32 {
	h := fnv.New32a()
	h.Write([]byte(s))
	return h.Sum32()
}

//func Hash(s string) uint32 {
//	var Hash uint32
//	for i := 0; i < len(s); i++ {
//		Hash += uint32(s[i])
//		Hash += (Hash << 10)
//		Hash ^= (Hash >> 6)
//	}
//
//	Hash += (Hash << 3)
//	Hash ^= (Hash >> 11)
//	Hash += (Hash << 15)
//	return Hash
//}
